5.11. Асинхронность
Асинхронность в Ruby
Часть 1. Основания
Асинхронность — это подход к организации вычислений, при котором операции могут инициироваться без ожидания их немедленного завершения. Исполнение программы продолжается, а результат операции обрабатывается позже, когда он становится доступен.
В Ruby, как и в большинстве языков с блокирующей по умолчанию моделью ввода-вывода, асинхронность становится инструментом повышения эффективности при работе с операциями, время завершения которых не определено заранее и лежит вне контроля программы: сетевые вызовы (HTTP, базы данных, внешние API), дисковые операции, ожидание событий от пользовательского ввода или системных процессов. Типичный сценарий: веб-сервер обрабатывает сотни запросов; каждый из них содержит обращение к внешнему API. Если каждый запрос будет блокировать поток исполнения до получения ответа, то пропускная способность сервера упадёт до уровня, определяемого задержками в сети, а не производительностью самого кода.
Ruby не является языком с «встроенной» асинхронностью уровня языка, в отличие, например, от JavaScript (с его event loop) или Rust (с async/await и runtime). Вместо этого в Ruby асинхронность реализуется на уровне библиотек, фреймворков и соглашений о проектировании, опирающихся на имеющиеся в языке средства: блоки, промисоподобные объекты, потоки (Thread), и, в последнее время — корутины (Fiber) с поддержкой планировщика.
Тем не менее, начиная с версии 3.0, в стандартную библиотеку Ruby вошёл модуль Fiber.scheduler, что ознаменовало переход от чисто библиотечного к частично языковому уровню поддержки асинхронности. Это не привело к изменению семантики языка (нет ключевых слов async/await), но позволило реализовать кооперативную многозадачность с прозрачной заменой блокирующих системных вызовов на неблокирующие через механизм хукинга.
Базовые термины и дифференциация понятий
Прежде чем перейти к реализациям, необходимо уточнить терминологию, часто смешиваемую в обсуждениях:
- Блокирующая операция — операция, при которой поток исполнения останавливается до её завершения. Например, вызов
Net::HTTP.getв стандартной библиотеке блокирует текущий поток на всё время ожидания ответа сервера. - Неблокирующая операция — операция, которая инициируется и возвращает управление немедленно (обычно с дескриптором задачи), позволяя коду продолжить работу. Сама по себе неблокирующая операция не гарантирует асинхронность: её результат всё ещё нужно как-то получить.
- Синхронный вызов — вызов, который завершается только после получения окончательного результата. Он может быть реализован как блокирующим, так и неблокирующим способом (например, через активное опросное ожидание — polling), но с точки зрения вызывающего кода разницы нет: управление возвращается только после завершения.
- Асинхронный вызов — вызов, который возвращает управление до завершения операции. Результат становится доступен позже, и программа должна быть готова его обработать — через коллбэки, промисы, каналы или await-подобные конструкции.
- Параллелизм — выполнение нескольких вычислений одновременно, обычно на разных ядрах процессора или в разных потоках ОС. В Ruby истинный параллелизм ограничен GVL (Global VM Lock), который предотвращает одновременное исполнение Ruby-кода в нескольких нативных потоках.
- Конкурентность — способность системы управлять несколькими задачами, переключаясь между ними, даже если физически они не выполняются одновременно. Асинхронность — один из способов достижения конкурентности.
Ruby по умолчанию предоставляет конкурентность через потоки (Thread), но из-за GVL они не дают выигрыша в производительности для CPU-нагруженных задач. Однако для I/O-нагруженных сценариев потоки эффективны: пока один поток ждёт ответа от сети, другой может обрабатывать уже полученные данные, потому что GVL освобождается при входе в системный вызов (например, read, write, connect). Это ключевой момент: блокирующие I/O в Ruby не блокируют всю виртуальную машину, а лишь текущий поток — и GVL временно снимается, позволяя другим потокам работать.
Таким образом, классическая асинхронность в Ruby исторически развивалась в двух направлениях:
- Многопоточность на основе
Thread— подход, близкий к традиционным языкам (Java, C#), где каждый запрос или задача выполняется в отдельном потоке. Прост в понимании (линейный поток управления), но требует осторожного управления общими ресурсами (мьютексы, атомарные операции) и может быть дорогостоящим по памяти при большом числе соединений (каждый поток — ~1–2 МБ стека). - Событийно-ориентированная модель на основе event loop — подход, заимствованный из Node.js и систем вроде Nginx. Здесь одна или несколько нитей управления обслуживают множество операций через цикл событий: операции регистрируются, event loop опрашивает их на готовность (например, через
select,epoll,kqueue), и при наступлении события вызывает соответствующий обработчик. Этот подход экономичен по памяти и масштабируем, но требует отказа от линейного потока управления — код становится «разорванным» на коллбэки или строится вокруг промисов/файберов.
До появления Fiber.scheduler события развивались параллельно: EventMachine, Celluloid, затем async/async-io, nio4r, concurrent-ruby. С Ruby 3.0 появилась возможность реализовать event loop внутри стандартной библиотеки, так что теперь можно писать асинхронный код, используя только встроенные средства — при условии, что используется совместимый планировщик.
Часть 2. Многопоточность в Ruby
В Ruby поток (Thread) — это объект, инкапсулирующий отдельную нить исполнения внутри одного процесса интерпретатора. Создание потока осуществляется вызовом Thread.new, которому передаётся блок кода. Этот блок начинает выполняться параллельно с основным потоком программы (точнее — конкурентно, как будет показано далее).
t = Thread.new do
puts "Работаю в фоновом потоке"
sleep 1
puts "Завершаю работу"
end
puts "Основной поток продолжает исполнение"
t.join # ожидание завершения потока t
puts "Поток t завершил работу"
Интуитивно кажется, что два потока работают одновременно. Однако из-за Global VM Lock (GVL) — внутреннего механизма синхронизации в MRI — только один поток Ruby-кода может выполняться в каждый момент времени. GVL — это мьютекс на уровне виртуальной машины, который должен быть захвачен перед исполнением любой инструкции байт-кода. Таким образом, истинный параллелизм на уровне CPU для чисто Ruby-кода невозможен в MRI.
Это утверждение часто вызывает недоумение, поскольку Ruby действительно позволяет создавать и запускать десятки, сотни потоков. GVL не блокирует всю систему, а лишь сериализует исполнение Ruby-операций. Как только поток входит в системный вызов, связанный с вводом-выводом (например, read, write, connect, accept, sleep), он освобождает GVL на время ожидания ответа от ядра ОС. В этот момент другие потоки могут захватить GVL и выполнять свой Ruby-код.
Следовательно, многопоточность в MRI эффективна только для задач с преобладанием операций ввода-вывода. Примеры:
- веб-сервер, обрабатывающий запросы с обращениями к базе данных;
- фоновый обработчик, скачивающий файлы по HTTP;
- сервис, подписанный на несколько очередей сообщений и обрабатывающий поступающие сообщения параллельно.
Для CPU-нагруженных задач (шифрование, сортировка больших массивов, численные расчёты на чистом Ruby) многопоточность в MRI не даёт выигрыша в скорости — более того, из-за накладных расходов на переключение контекста и синхронизацию производительность может ухудшиться. Для таких сценариев предпочтительны:
- использование нативных расширений (написанных на C, Rust и т.п.), которые могут освобождать GVL явно через
rb_thread_call_without_gvl; - запуск нескольких процессов Ruby (например, через
fork,Process.spawn, или менеджеры процессов вроде Puma в кластерном режиме); - переход на альтернативные реализации Ruby (JRuby, TruffleRuby), где GVL отсутствует или реализован иначе.
Жизненный цикл потока и основные методы
Поток в Ruby создаётся в состоянии runnable. После старта он переходит в running, если захватил GVL. Поток может перейти в состояние sleep (при вызове Thread.sleep, Queue.pop, ожидании на мьютексе) или dead (по завершении блока или при необработанном исключении).
Ключевые методы экземпляра Thread:
#join(timeout = nil)— приостанавливает текущий поток до завершения целевого. Если указан таймаут, ожидание прерывается по его истечении. Возвращает сам поток. Если поток завершился с исключением, оно не пробрасывается в вызывающий поток — его необходимо получить через#valueили#status.#value— эквивалентенjoin, но возвращает результат выполнения блока потока или пробрасывает исключение, с которым поток завершился. Это предпочтительный способ получения результата, если он нужен.#kill,#exit,#terminate— запрос на завершение потока. Важно: это асинхронный запрос, а не немедленное завершение. Поток будет остановлен при следующей точке прерывания (например, при входе в новую инструкцию). Нет гарантии, что поток остановится мгновенно, особенно если он находится в бесконечном цикле без точек прерывания.#alive?— возвращаетtrue, если поток ещё не завершён.#status— строка, описывающая состояние:"run","sleep","aborting"илиfalse(если поток мёртв). Если поток умер с исключением, статус —"aborting"до вызова#joinили#value, после чего становитсяfalse.Thread.current— ссылка на текущий поток. Часто используется для хранения контекста (например,Thread.current[:request_id] = idв веб-фреймворках).Thread.main— ссылка на главный поток программы.
Пример корректного получения результата с обработкой исключений:
t = Thread.new do
# имитация работы
sleep 0.1
raise "Ошибка в потоке" if rand < 0.5
"успех"
end
begin
result = t.value # вызывает join и возвращает результат или исключение
puts "Поток вернул: #{result}"
rescue => e
puts "Поток завершился с ошибкой: #{e.message}"
end
Обратите внимание: исключение, возникшее внутри потока, не прерывает основной поток. Оно остаётся «прикреплённым» к объекту потока и проявляется только при вызове #value или #join (в последнем случае — только при явной проверке через raise t.error после join). Это сознательный дизайн: он предотвращает случайное падение всей программы из-за сбоя в фоновом потоке, но требует от разработчика дисциплины в обработке ошибок.
Синхронизация доступа к разделяемым данным
Поскольку потоки разделяют адресное пространство процесса, они имеют общий доступ к глобальным переменным, константам, объектам, созданным в основном потоке, и, что особенно важно, к экземплярам изменяемых классов (Array, Hash, String, пользовательские объекты с изменяемым состоянием). Без синхронизации одновременные операции записи (или чтения во время записи) могут привести к состоянию гонки (race condition) — ситуации, когда итоговое состояние зависит от временного порядка выполнения инструкций, что делает поведение программы непредсказуемым.
Ruby предоставляет несколько примитивов синхронизации:
Mutex (взаимное исключение)
Наиболее часто используемый примитив. Гарантирует, что только один поток может находиться внутри критической секции в каждый момент времени.
mutex = Mutex.new
counter = 0
threads = 10.times.map do
Thread.new do
1000.times do
mutex.synchronize do
counter += 1
end
end
end
end
threads.each(&:join)
puts counter # гарантированно 10000
Без mutex.synchronize результат был бы меньше 10000 — из-за конфликтов при одновременном чтении и записи.
Метод #synchronize принимает блок и автоматически захватывает/освобождает мьютекс, включая случаи исключений (гарантированная разблокировка). Ручное использование #lock / #unlock не рекомендуется.
Queue (очередь с блокирующим извлечением)
Потокобезопасная очередь FIFO, реализующая шаблон producer–consumer. Основные методы:
#push(item)/<<— помещает элемент в очередь (не блокирует).#pop(non_block = false)— извлекает элемент; если очередь пуста иnon_block == false(по умолчанию), поток блокируется до появления элемента. Приnon_block == trueвыбрасываетThreadError, если очередь пуста.#size,#empty?,#clear— потокобезопасны.
Пример: фоновая обработка задач.
job_queue = Queue.new
# Потребитель
worker = Thread.new do
loop do
job = job_queue.pop # блокируется, пока нет задач
break if job == :stop
puts "Обрабатываю: #{job}"
sleep 0.01
end
end
# Производители
10.times do |i|
Thread.new { job_queue << "задача-#{i}" }
end
sleep 0.1
job_queue << :stop
worker.join
Queue — один из самых надёжных способов организации взаимодействия между потоками, поскольку он инкапсулирует всю логику синхронизации и позволяет избежать прямого доступа к разделяемым переменным.
ConditionVariable (условная переменная)
Позволяет потоку ждать наступления определённого условия, а не просто освобождения мьютекса. Используется совместно с Mutex.
Типичный шаблон: один или несколько потоков ожидают, пока условие (например, «очередь не пуста» или «ресурс освободился») станет истинным; другой поток, изменив состояние, уведомляет ожидающие.
mutex = Mutex.new
cv = ConditionVariable.new
items = []
producer = Thread.new do
5.times do |i|
mutex.synchronize do
items << "элемент-#{i}"
cv.signal # будит один ожидающий поток
end
sleep 0.05
end
end
consumer = Thread.new do
5.times do
mutex.synchronize do
# Ждём, пока items не станет непустым
cv.wait(mutex) while items.empty?
puts "Получил: #{items.shift}"
end
end
end
producer.join
consumer.join
#wait(mutex) освобождает переданный мьютекс и приостанавливает поток. При пробуждении (#signal или #broadcast) мьютекс автоматически захватывается обратно — поэтому проверка условия (while items.empty?) должна выполняться внутри synchronize.
Практические ограничения и ловушки многопоточности в MRI
-
Создание большого числа потоков дорого. Каждый поток в ОС требует выделения стека (по умолчанию ~1–2 МБ в Linux). При создании тысячи потоков потребление памяти достигает гигабайта — что делает этот подход неприменимым для высоконагруженных серверов с десятками тысяч соединений (C10K problem). В таких случаях предпочтительны event loop или гибридные модели (например, пул потоков + event loop внутри каждого потока).
-
Отладка состояний гонки сложна. Ошибки проявляются непредсказуемо, только при определённом чередовании потоков. Инструменты вроде
Thread.report_on_exception = true(выводит стек-трейс завершившегося с исключением потока в stderr) иThread.abort_on_exception = true(прерывает всю программу при исключении в любом потоке) помогают на ранних этапах, но не заменяют системного подхода к проектированию. -
Не все gems потокобезопасны. Многие библиотеки, особенно старые, предполагают однопоточное использование. Например, глобальное состояние в
OpenSSL, кэши вActiveSupport, соединения вNet::HTTP(экземплярNet::HTTPне предназначен для совместного использования между потоками). Перед использованием gem в многопоточной среде необходимо проверять его документацию на предмет thread-safety. -
Опасность deadlock’ов. Возникает, когда два потока удерживают по одному мьютексу и ждут освобождения другого. Ruby не имеет встроенного детектора deadlock’ов, хотя интерпретатор может обнаружить их при завершении программы и вывести предупреждение в stderr.
Часть 3. Событийно-ориентированная модель
Событийно-ориентированное программирование (event-driven programming) — это парадигма, в которой поток управления определяется поступлением событий: готовностью сокета к чтению/записи, истечением таймера, поступлением сообщения из очереди и т.п. В отличие от многопоточной модели, где каждая задача имеет собственный стек и «думает», что работает независимо, здесь все задачи разделяют один стек и исполняются по очереди, уступая управление друг другу в заранее определённых точках.
Ключевое преимущество такого подхода — минимальное потребление памяти. Поскольку нет сотен стеков потоков, приложение может одновременно обслуживать десятки или сотни тысяч соединений, используя всего несколько мегабайт RAM. Это делает event loop основой большинства высокопроизводительных сетевых серверов: Nginx, Redis, Node.js, и — в экосистеме Ruby — серверов типа Falcon, Iodine, или асинхронных клиентов вроде async-http.
Но как это работает на уровне ОС? Как Ruby-код «узнаёт», что сокет готов к чтению, не блокируя исполнение?
Неблокирующий I/O и системные вызовы мультиплексирования
Ядро операционной системы предоставляет интерфейсы для неблокирующего ввода-вывода. Основная идея: вместо того чтобы вызвать read(fd) и ждать, пока данные поступят в буфер (блокирующий режим), приложение:
- Переводит сокет в неблокирующий режим (
fcntl(fd, F_SETFL, O_NONBLOCK)в POSIX). - Регистрирует сокет в мультиплексоре событий — системном вызове, который может отслеживать готовность множества дескрипторов одновременно:
select— старейший, ограничен ~1024 дескрипторами, O(n) сложность.poll— снимает ограничение на число дескрипторов, но всё ещё O(n).epoll(Linux),kqueue(BSD/macOS) — масштабируемые интерфейсы с O(1) сложностью, поддерживающие тысячи и миллионы соединений.
Мультиплексор (epoll_wait, kevent) блокирует вызывающий поток — но только до первого события любого из зарегистрированных дескрипторов. Как только хотя бы один сокет становится готов к чтению или записи, вызов возвращает управление, и приложение может обработать все готовые дескрипторы за один проход.
Именно на этом строится event loop:
Инициализация:
— создаётся мультиплексор (epoll/kqueue)
— регистрируются начальные события (например, «ждать подключения на порту 80»)
Основной цикл:
1. Вызов epoll_wait(...) — блокируется до первого события
2. Получение списка готовых дескрипторов
3. Для каждого готового дескриптора:
- вызывается соответствующий обработчик (callback)
- обработчик выполняется до завершения (без прерываний!)
4. Возврат к шагу 1
Важнейшее следствие: обработчики событий должны быть максимально короткими и неблокирующими. Если обработчик займёт 100 мс на CPU-вычислениях, все остальные соединения «зависнут» на это время — ведь event loop не переключится на другую задачу, пока текущая не завершится. Это принципиальное отличие от многопоточности: в потоках «зависание» одного не останавливает другие; в event loop — останавливает весь цикл.
EventMachine
Библиотека EventMachine, появившаяся в середине 2000-х, стала первой попыткой принести event-driven модель в Ruby. Она написана на C++ и предоставляет Ruby API поверх epoll/kqueue/select.
Основные компоненты:
EM.run { ... }— запускает event loop (блокирует текущий поток доEM.stop).EM.connect(host, port, handler_class)— инициирует TCP-соединение; по готовности создаётся экземплярhandler_class.EM.start_server(port, handler_class)— запускает TCP-сервер; для каждого входящего соединения создаётсяhandler_class.
Обработчик (handler_class) должен наследовать EM::Connection и реализовывать методы вроде #post_init, #receive_data(data), #unbind, #send_data.
Пример простого HTTP-клиента:
require 'eventmachine'
class HttpClient < EM::Connection
def post_init
send_data "GET / HTTP/1.1\r\nHost: example.com\r\nConnection: close\r\n\r\n"
end
def receive_data(data)
@buffer ||= ""
@buffer << data
end
def unbind
puts "Получен ответ (#{[@buffer.bytesize]} байт)"
EM.stop
end
end
EM.run do
EM.connect "example.com", 80, HttpClient
end
Проблемы EventMachine:
- Сильная зависимость от глобального состояния (
EM.run— один на процесс). - Коллбэк-ориентированный стиль приводит к «callback hell»: вложенность, сложность обработки ошибок, трудности с композицией.
- Трудности с интеграцией с синхронным кодом (например, ActiveRecord).
- Отсутствие поддержки TLS «из коробки» (требуется
EM::Protocols::StartTLS). - Медленное развитие, прекращение активной поддержки.
Несмотря на это, EventMachine заложил основу понимания: асинхронность в Ruby возможна, и её можно реализовать эффективно.
Celluloid
Библиотека Celluloid (2010–2017) предложила другой подход — модель акторов. Каждый объект-актор выполняется в собственном потоке, но взаимодействие с ним происходит только через асинхронные сообщения. Celluloid автоматически оборачивал методы в потоки, предоставляя методы вроде async.some_method() и future.some_method().
class Worker
include Celluloid
def work(n)
sleep 0.1
n * n
end
end
worker = Worker.new
future = worker.future.work(5) # запускает work в фоновом потоке
puts future.value # блокируется до результата → 25
Celluloid был популярен в проектах вроде Sidekiq (до v6), но со временем выявил серьёзные недостатки:
- Отсутствие контроля над числом потоков (по умолчанию — пул, но легко создать «утечку» потоков).
- Сложность отладки: исключения в акторах могли «теряться».
- Высокие накладные расходы на межпоточное взаимодействие.
- Несовместимость с GVL в CPU-нагруженных сценариях.
- Прекращение поддержки в 2017 г.
Celluloid показал, что «магическое» скрытие конкурентности за интерфейсом объектов опасно: разработчик должен осознавать природу исполнения.
NIO.4R и async
Библиотека nio4r (New I/O for Ruby), созданная разработчиками JRuby, предоставляет тонкую обёртку над epoll/kqueue, без глобального состояния. Она не определяет поведение приложения — только даёт инструмент: NIO::Selector, NIO::Monitor, #select(timeout).
Это позволило построить более гибкие системы, в частности — async (автор Samuel Williams), который стал де-факто стандартом для современной асинхронности в Ruby.
Ключевые идеи async:
- Нет глобального event loop’а. Event loop создаётся явно через
Async::Reactor.run { ... }и может быть вложен (reactor-in-reactor). - Fiber как единица конкурентности. Каждая асинхронная задача выполняется в отдельном
Fiber, что позволяет писать линейный код сawait-подобной семантикой. - Интеграция с
Fiber.scheduler(начиная с Ruby 3.0). Библиотекаasyncможет работать как с собственным scheduler’ом, так и с встроенным — обеспечивая совместимость. - Постепенная асинхронность: можно начать с синхронного кода и постепенно «обернуть» части в
Async do ... end.
Пример на async:
require 'async'
require 'async/http/internet'
Async do
internet = Async::HTTP::Internet.new
3.times do |i|
Async do
response = internet.get("https://httpbin.org/delay/1")
puts "Запрос #{i}: статус #{response.status}"
end
end
end
# Все 3 запроса выполняются параллельно, но в одном потоке.
Здесь Async do ... end создаёт новую задачу (task), которая выполняется в отдельном Fiber. Вызов internet.get — кооперативно неблокирующий: если данные ещё не готовы, Fiber приостанавливается (Fiber.yield), управление возвращается в event loop, и другие задачи получают шанс выполниться. Как только сокет готов, Fiber возобновляется с того места, где был остановлен.
Это кооперативная многозадачность: переключение происходит только в явно обозначенных точках (всегда внутри вызовов асинхронных методов), а не принудительно (как в потоках). Это устраняет необходимость в мьютексах для большинства случаев — так как в каждый момент активен только один Fiber, состояние гонки при доступе к локальным переменным невозможна. Однако разделяемые ресурсы (глобальные переменные, соединения с БД) всё ещё требуют синхронизации.
Архитектурные последствия событийной модели
-
Отказ от блокирующих вызовов. Любой вызов
sleep,Net::HTTP.get,File.read,DB.queryвнутри асинхронного контекста останавливает весь event loop. Поэтому необходимы асинхронные аналоги:Async.sleep,Async::HTTP,Aysnc::Redis,rom-sqlс асинхронным адаптером и т.п. -
Обработка исключений требует явного управления. В коллбэк-стиле исключение в обработчике может упасть «в никуда». В
asyncкаждая задача имеет изолированный контекст: исключение, не перехваченное внутриAsync do ... end, завершает только эту задачу, но не весь reactor. -
Сложность интеграции с синхронным кодом. ORM, HTTP-клиенты, драйверы БД, написанные без учёта асинхронности, не могут быть просто «вставлены» в event loop. Требуется либо их замена на асинхронные аналоги, либо вынос в пул потоков (например, через
Async::ThreadPool). Это — главный барьер на пути массового внедрения. -
Преимущество в I/O-нагрузке, уязвимость к CPU. Event loop блестяще справляется с тысячами сетевых соединений, но один тяжёлый расчёт «заморозит» всю систему. Решение — вынос CPU-нагрузки в отдельные процессы или потоки.
Часть 4. Fiber и Fiber.scheduler
Что такое Fiber и чем он отличается от Thread
Fiber — это примитив кооперативной многозадачности, существующий в Ruby с версии 1.9. В отличие от Thread, который управляется планировщиком операционной системы и может быть прерван в любой момент (вытесняющая многозадачность), Fiber переключается только по инициативе самого кода (кооперативная многозадачность). Это означает:
- В один момент времени активен строго один
Fiberв рамках одного потока. - Переключение происходит только при явном вызове
Fiber.yield,Fiber#resume, или — в случаеFiber.scheduler— при входе в «хукируемую» операцию ввода-вывода. - У
Fiberнет отношения к GVL: поскольку он не является нативным потоком, он не конкурирует за захват GVL — он просто выполняется внутри того потока, который его запустил.
Создание и управление Fiber вручную:
f = Fiber.new do
puts "1: начало"
x = Fiber.yield 100 # приостанавливает fiber, возвращает 100 вызвавшему
puts "3: возобновление с аргументом #{x}"
"результат"
end
puts "0: до resume"
v1 = f.resume # → "1: начало", затем возвращает 100
puts "2: после первого resume: #{v1}"
v2 = f.resume("арг") # → "3: возобновление...", затем возвращает "результат"
puts "4: итог: #{v2}"
Важно: Fiber — это легковесная структура. Её стек хранится в куче и занимает килобайты (а не мегабайты, как у Thread). Можно безопасно создавать десятки тысяч Fiber в одном процессе.
Однако ручное управление Fiber неудобно: код становится фрагментированным, как при использовании коллбэков. Ключевой прорыв — делегирование управления Fiber внешнему планировщику (scheduler), который автоматически приостанавливает и возобновляет Fiber при операциях ввода-вывода.
Fiber.scheduler
Начиная с Ruby 3.0, в стандартной библиотеке появился модуль Fiber.scheduler. Это — протокол, которому должен следовать объект-планировщик. Разработчик может написать собственный scheduler или использовать готовый (например, из гема async).
Планировщик устанавливается глобально для текущего потока:
Fiber.set_scheduler(MyScheduler.new)
После этого определённые системные вызовы, выполняемые внутри Fiber, перехватываются и заменяются на неблокирующие аналоги, управляемые планировщиком. К таким вызовам относятся:
Kernel#sleepIO#wait_readable,IO#wait_writable,IO#waitIO#read,IO#write,IO#readpartial,IO#sysread,IO#syswriteSocket#connect,Socket#acceptProcess.wait,Process.waitpidThread.join
Каждый из этих методов, если вызван внутри Fiber, при наличии установленного scheduler’а, передаёт управление методу планировщика с соответствующим именем (например, #kernel_sleep, #io_wait, #io_read). Планировщик может:
- Асинхронно зарегистрировать операцию в event loop (например, добавить сокет в
epollс ожиданиемEPOLLIN). - Приостановить текущий
FiberчерезFiber.yield. - Вернуть управление в event loop, чтобы он ожидал событий.
- По готовности операции — возобновить
Fiber, передав результат или исключение.
Таким образом, код, написанный в привычном синхронном стиле:
Fiber.set_scheduler(Async::Scheduler.new)
Fiber.schedule do
io = TCPSocket.new("example.com", 80)
io.write "GET / HTTP/1.1\r\nHost: example.com\r\n\r\n"
response = io.read
puts response
end
— на самом деле выполняется асинхронно. Вызовы TCPSocket.new, write, read внутри Fiber.schedule (это синоним Fiber.new(...).resume) перехватываются Async::Scheduler, который:
- заменяет
connectна неблокирующийconnect+ регистрацию вepoll; - при неготовности сокета к записи — приостанавливает
Fiber; - при наступлении события
EPOLLOUT— возобновляетFiber, иwriteпродолжает работу; - аналогично для
read.
Для программиста — никаких коллбэков, никаких промисов. Только линейный поток управления.
Требования к «асинхронной совместимости» gems
Гем считается асинхронно совместимым, если он:
- Использует только хукируемые операции при работе с I/O. То есть не вызывает напрямую системные функции через
FFI,C extensionsбез освобождения GVL, или приватные методы вродеIO#read_nonblockбез проверки scheduler’а. - Не блокирует поток надолго в CPU-операциях. Если гем делает тяжёлые вычисления, он должен предоставлять способ их выноса (например, через
Async::ThreadPool). - Не хранит глобальное состояние, зависящее от потока, без учёта
Fiber. Например, кэши, привязанные кThread.current, будут нарушены, если задачи переключаются междуFiberв одном потоке. Корректный подход — использоватьFiber.current[:key]или передавать контекст явно.
Примеры совместимых гемов:
async(ядро)async-http,async-io,async-containerasync-redis,async-postgres(нативные асинхронные драйверы)falcon(веб-сервер)protocol-http,protocol-http2
Примеры несовместимых (без адаптации):
net/http— использует блокирующиеIO#readбез проверки scheduler’а.pg,redis(официальные драйверы) — написаны на C, вызывают блокирующие системные вызовы.ActiveRecord— зависит от синхронных адаптеров, глобального состояния соединений.
Однако даже несовместимые гемы можно использовать — с оговорками. Например, через Async::ThreadPool:
Async::ThreadPool.io do
response = Net::HTTP.get(URI("https://example.com"))
end
Этот вызов выполняет Net::HTTP.get в отдельном потоке из пула, освобождая event loop на время ожидания. Это не даёт преимуществ по памяти, но позволяет интегрировать legacy-код.
Почему Ractor — не решение для асинхронности
С Ruby 3.0 также появился Ractor — механизм изоляции данных по принципу акторов, предназначенный для параллелизма на уровне CPU. Часто возникает путаница: «Ractor заменит асинхронность».
Это неверно. Различия фундаментальны:
| Критерий | Fiber + Fiber.scheduler | Ractor |
|---|---|---|
| Цель | Эффективная обработка I/O | Использование нескольких ядер CPU |
| Управление | Кооперативное (внутри одного потока) | Вытесняющее (по потокам ОС) |
| Изоляция | Отсутствует (общая память) | Строгая (копирование/передача) |
| Стоимость создания | Очень низкая (кБ) | Высокая (МБ, fork-накладные) |
| GVL | Не влияет (работает внутри потока) | Обходит GVL (каждый Ractor — отдельный GVL) |
| Типичное применение | Веб-серверы, клиенты API, обработка событий | Численные расчёты, парсинг, ML |
Ractor не помогает при работе с сетью: создание 10 000 Ractor’ов для 10 000 соединений невозможно из-за потребления памяти и накладных расходов на IPC. Асинхронность на Fiber остаётся единственным практичным решением для I/O-bound задач.
Практические ограничения Fiber.scheduler
- Планировщик устанавливается на поток, а не на процесс. Это позволяет смешивать синхронный и асинхронный код в одном приложении (например, один поток — main thread для Rails, другой — reactor thread для асинхронных фоновых задач).
- Нельзя вложить
Fiber.set_schedulerбез явной поддержки. Некоторые планировщики (например,async) поддерживают вложенность:Async do; Async do; ... end; end, другие — нет. - Не все операции хукируются. Например,
File.open(...).readне перехватывается по умолчанию — только если файловый дескриптор уже открыт в неблокирующем режиме и используется черезIO#read. Для файлов это редко практично (дисковый I/O редко бывает «долгим ожиданием», как сеть), поэтому их обычно выносят в пул потоков. - Отладка сложнее. Стек вызовов «размазан» по нескольким
Fiber, инструменты вродеpryилиbyebugмогут вести себя нестабильно внутри reactor’а.
Часть 6. Антишаблоны и типичные ошибки
1. Блокирующие вызовы внутри асинхронного контекста
Самая распространённая и опасная ошибка — использование любого блокирующего системного вызова внутри Fiber, управляемого Fiber.scheduler. Последствия неочевидны: приложение не падает, тесты проходят, но под нагрузкой latency резко возрастает, а пропускная способность падает до уровня синхронного сервера.
Примеры блокирующих вызовов:
| Вызов | Почему блокирует | Асинхронная замена |
|---|---|---|
Net::HTTP.get(...) | Использует IO#read без проверки scheduler’а | Async::HTTP::Internet#post |
File.read("large.log") | Синхронное чтение с диска | Чтение по частям + Async::ThreadPool.io, или IO.copy_stream с неблокирующим дескриптором |
sleep(1) | Блокирует поток на 1 с | Async.sleep(1) |
DB.query(...) (через pg, mysql2) | Вызов нативной функции без освобождения GVL | async-postgres, async-mysql или Async::ThreadPool.io |
OpenSSL::SSL::SSLSocket#connect | Блокирующий handshake | async-io + Async::IO::SSLSocket |
Диагностика:
- Измеряйте время выполнения отдельных операций под нагрузкой.
- Используйте
Async::Timeout(0.1) { ... }вокруг подозрительных участков — если часто происходит timeout, где не должен, — есть блокировка. - Профилируйте с помощью
async-profilerилиrbtrace, фильтруя поRSTRING/RARRAY— долгие удержания GVL в I/O-коде — признак проблемы.
Антишаблон:
«Обернули Rails-контроллер в
Async do, и теперь все внешние вызовы параллельны».
Реальность: ActiveRecord внутри по-прежнему использует pg, который блокирует. Вместо ускорения — все запросы теперь «ждут друг друга» в одном event loop, что хуже, чем в Puma с thread pool.
Правильный подход:
- Асинхронность — на границе системы (клиенты, серверы), а не в бизнес-логике.
- Если нужно вызвать синхронный gem — выносите его в
Async::ThreadPool.io. - Пишите адаптеры:
class AsyncEmailService < SyncEmailService::AsyncAdapter.
2. Утечки задач и ресурсов: «забытые» Fiber
В отличие от потоков, Fiber не управляется автоматически сборщиком мусора, если на него есть ссылка. Задача (Async::Task), не завершившаяся корректно, может удерживать:
- соединения с БД или HTTP-серверами;
- буферы памяти (например, для streaming-ответов);
- таймеры и подписки на события.
Пример утечки:
Async do
loop do
Async do
# Запрос без таймаута и без проверки ответа
Async::HTTP::Internet.new.get("https://slow-api.example.com")
end
Async.sleep(0.1)
end
end
Если slow-api не отвечает, каждый Async do создаёт задачу, которая висит в состоянии pending бесконечно. Память и дескрипторы соединений исчерпываются.
Защитные меры:
- Всегда используйте
Async.timeoutдля внешних вызовов:Async.timeout(5) { client.get(...) } - Явно отменяйте задачи при завершении родителя:
task = Async do ... end
# ...
task.cancel if task.alive? - Используйте
Async::Barrierдля гарантированного завершения группы задач:barrier = Async::Barrier.new
10.times { barrier.async { ... } }
barrier.wait # ждёт все или до ошибки
3. Неправильная обработка исключений
Поведение исключений в асинхронном контексте отличается от синхронного:
- Исключение внутри
Async do ... endне прерывает другие задачи и не падает в основной поток. - Если не вызван
task.waitилиtask.value, исключение остаётся «незамеченным». - При использовании
Async::ThreadPoolисключение может «потеряться» в другом потоке.
Антишаблон:
Async do
internet.get("https://unreachable")
# Исключение будет прикреплено к задаче, но никто его не увидит
end
# Программа завершается успешно — ошибка проигнорирована
Решение:
- Всегда вызывайте
task.waitили оборачивайте вbegin/rescue. - Используйте
Async::Task#failureдля проверки:task = Async do ... end
task.wait
if task.failed?
log_error(task.failure)
end - Настройте глобальный обработчик для неперехваченных исключений:
Async do |task|
task.annotate_exception = true # добавляет контекст в backtrace
end.exception_handler do |task, exception|
Sentry.capture_exception(exception)
end
4. Ложные ожидания от Ractor
Ошибка:
«Создадим 10 000
Ractor’ов для 10 000 соединений — будет быстро и масштабируемо».
Реальность:
- Создание
Ractorтребует fork-подобной инициализации (копирование состояния или передача через каналы). - Каждый
Ractorимеет собственный GVL, но и собственный overhead (стек, IPC-буферы). - Передача сообщений между
Ractor’ами — сериализация черезMarshal, что дорого для больших объектов. - Сетевые операции внутри
Ractorпо-прежнему блокируют его поток — вы не получаете выигрыша от event loop.
Когда Ractor уместен:
- Разделение CPU-нагрузки: парсинг, шифрование, численные методы.
- Изоляция ненадёжного кода (например, загрузка плагинов).
- Обработка потоков данных в pipeline:
Ractor-парсер →Ractor-валидатор →Ractor-сохранение.
Не используйте Ractor для I/O. Это не замена Fiber.scheduler.
5. Состояние гонки в «кажущейся» однопоточности
Многие разработчики полагают: «в одном Fiber — нет гонок». Это верно только для локальных переменных. Однако:
- Глобальные переменные (
$global), классовые переменные (@@class_var), инстанс-переменные общих объектов (@shared_cache) — по-прежнему разделяются междуFiber. Fiber.current[:key]— не заменаThread.current[:key], еслиFiberсоздаются вручную без учёта иерархии.
Пример:
cache = {}
Async do
100.times do
Async do
key = rand(10)
unless cache.key?(key)
# Гонка: два Fiber могут пройти проверку одновременно
cache[key] = expensive_computation(key)
end
end
end
end
Решение:
- Используйте
Async:: Mutex(изasyncgem) для критических секций:mutex = Async:: Mutex.new
mutex.async do
unless cache.key?(key)
cache[key] = expensive_computation(key)
end
end - Предпочитайте неизменяемые структуры данных (
Hamster::Hash). - Избегайте глобального состояния — передавайте контекст явно.
6. «Асинхронное спагетти»
При чрезмерном использовании вложенных Async do и await-подобных операций (в гемах, эмулирующих await) код может стать нечитаемым:
Async do
user = fetch_user(id)
if user
Async do
profile = fetch_profile(user)
Async do
friends = fetch_friends(profile)
# ...
end
end
end
end
Это — возврат к callback hell, только с Fiber.
Правильный стиль:
- Одна задача — одна единица ответственности.
- Используйте возврат значений и
task.waitдля композиции:user_task = Async { fetch_user(id) }
profile_task = Async { fetch_profile(user_task.wait) }
friends_task = Async { fetch_friends(profile_task.wait) }
friends_task.wait - Для последовательных операций — просто линейный код внутри одного
Async do:Async do
user = fetch_user(id)
profile = fetch_profile(user)
friends = fetch_friends(profile)
friends
end
7. Игнорирование backpressure
Асинхронность позволяет легко создать 10 000 параллельных запросов к внешнему API. Но если API не выдержит — начнётся каскадный сбой.
Антишаблон:
«Запустили 10 000 параллельных запросов к SMS-шлюзу — все ушли, но 9 500 получили 429 Too Many Requests».
Решение:
- Используйте семафоры для ограничения параллелизма:
semaphore = Async:: Semaphore.new(10) # не более 10 одновременных запросов
tasks = users.map do |user|
Async do
semaphore.async do
send_sms(user)
end
end
end - Реализуйте circuit breaker (например,
semianилиasync-circuit). - Поддерживайте retry с экспоненциальной задержкой (
Async::Retry).